{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed Deep Learning with Spark on CIFAR 10 Dataset:\n",
    "![CIFAR-10](https://cntk.ai/jup/201/cifar-10.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import CNTKLearner\n",
    "import os, tarfile, pickle\n",
    "import urllib.request\n",
    "cdnURL = \"https://amldockerdatasets.azureedge.net\"\n",
    "# Please note that this is a copy of the CIFAR10 dataset originally found here:\n",
    "# http://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz\n",
    "dataFile = \"cifar-10-python.tar.gz\"\n",
    "dataURL = cdnURL + \"/CIFAR10/\" + dataFile\n",
    "if not os.path.isfile(dataFile):\n",
    "    urllib.request.urlretrieve(dataURL, dataFile)\n",
    "with tarfile.open(dataFile, \"r:gz\") as f:\n",
    "    test_dict = pickle.load(f.extractfile(\"cifar-10-batches-py/test_batch\"),\n",
    "                            encoding=\"latin1\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Used for debugging\n",
    "import socket\n",
    "print(socket.gethostname())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create the images with labels from CIFAR dataset,\n",
    "# reformat the labels using OneHotEncoder\n",
    "import array\n",
    "from pyspark.sql.functions import udf\n",
    "from pyspark.ml.linalg import Vectors, VectorUDT\n",
    "from pyspark.ml.feature import OneHotEncoderEstimator\n",
    "from pyspark.sql.functions import col\n",
    "from pyspark.sql.types import *\n",
    "\n",
    "def reshape_image(record):\n",
    "    image, label, filename = record\n",
    "    data = [float(x) for x in image.reshape(3,32,32).flatten()]\n",
    "    return data, label, filename\n",
    "\n",
    "convert_to_double = udf(lambda x: x, ArrayType(DoubleType()))\n",
    "\n",
    "image_rdd = zip(test_dict[\"data\"], test_dict[\"labels\"], test_dict[\"filenames\"])\n",
    "image_rdd = spark.sparkContext.parallelize(image_rdd).map(reshape_image)\n",
    "\n",
    "imagesWithLabels = image_rdd.toDF([\"images\", \"labels\", \"filename\"])\n",
    "\n",
    "list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())\n",
    "\n",
    "imagesWithLabels = imagesWithLabels.withColumn(\n",
    "                       \"images\",\n",
    "                       list_to_vector_udf(convert_to_double(col(\"images\")))) \\\n",
    "                       .select(\"images\", \"labels\")\n",
    "\n",
    "ohe = OneHotEncoderEstimator() \\\n",
    "        .setInputCols([\"labels\"]).setOutputCols([\"tmplabels\"]) \\\n",
    "        .setDropLast(False)\n",
    "imagesWithLabels = ohe.transform(imagesWithLabels) \\\n",
    "                      .select(\"images\", \"tmplabels\") \\\n",
    "                      .withColumnRenamed(\"tmplabels\", \"labels\")\n",
    "\n",
    "imagesWithLabels.printSchema()\n",
    "\n",
    "imagesWithLabels.cache()\n",
    "print(imagesWithLabels.count())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the neural network to be trained via CNTK's brainscript file notation\n",
    "brainscriptText = \"\"\"\n",
    "    # ConvNet applied on CIFAR-10 dataset, with no data augmentation.\n",
    "\n",
    "    parallelTrain = true\n",
    "\n",
    "    TrainNetwork = {\n",
    "        action = \"train\"\n",
    "\n",
    "        BrainScriptNetworkBuilder = {\n",
    "            imageShape = 32:32:3\n",
    "            labelDim = 10\n",
    "\n",
    "            featMean = 128\n",
    "            featScale = 1/256\n",
    "            Normalize{m,f} = x => f .* (x - m)\n",
    "\n",
    "            model = Sequential (\n",
    "                Normalize {featMean, featScale} :\n",
    "                ConvolutionalLayer {64, (3:3), pad = true} : ReLU :\n",
    "                ConvolutionalLayer {64, (3:3), pad = true} : ReLU :\n",
    "                  MaxPoolingLayer {(3:3), stride = (2:2)} :\n",
    "                ConvolutionalLayer {64, (3:3), pad = true} : ReLU :\n",
    "                ConvolutionalLayer {64, (3:3), pad = true} : ReLU :\n",
    "                  MaxPoolingLayer {(3:3), stride = (2:2)} :\n",
    "                DenseLayer {256} : ReLU : Dropout :\n",
    "                DenseLayer {128} : ReLU : Dropout :\n",
    "                LinearLayer {labelDim}\n",
    "            )\n",
    "\n",
    "            # inputs\n",
    "            features = Input {imageShape}\n",
    "            labels   = Input {labelDim}\n",
    "\n",
    "            # apply model to features\n",
    "            z = model (features)\n",
    "\n",
    "            # connect to system\n",
    "            ce       = CrossEntropyWithSoftmax     (labels, z)\n",
    "            errs     = ClassificationError         (labels, z)\n",
    "            top5Errs = ClassificationError         (labels, z, topN=5)  # only used in Eval action\n",
    "\n",
    "            featureNodes    = (features)\n",
    "            labelNodes      = (labels)\n",
    "            criterionNodes  = (ce)\n",
    "            evaluationNodes = (errs)  # top5Errs only used in Eval\n",
    "            outputNodes     = (z)\n",
    "        }\n",
    "\n",
    "        SGD = {\n",
    "            epochSize = 0\n",
    "            minibatchSize = 32\n",
    "\n",
    "            learningRatesPerSample = 0.0015625*10:0.00046875*10:0.00015625\n",
    "            momentumAsTimeConstant = 0*20:607.44\n",
    "            maxEpochs = 30\n",
    "            L2RegWeight = 0.002\n",
    "            dropoutRate = 0*5:0.5\n",
    "\n",
    "            numMBsToShowResult = 100\n",
    "            parallelTrain = {\n",
    "                parallelizationMethod = \"DataParallelSGD\"\n",
    "                parallelizationStartEpoch = 2  # warm start: don't use 1-bit SGD for first epoch\n",
    "                distributedMBReading = true\n",
    "                dataParallelSGD = { gradientBits = 1 }\n",
    "            }\n",
    "        }\n",
    "    }\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Split the images with labels into a train and test data\n",
    "train, test = imagesWithLabels.randomSplit([0.6, 0.4], seed=123)\n",
    "train.printSchema()\n",
    "train.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Specify the working directory and GPU node name and GPU count\n",
    "workingDir = \"file:/tmp/gpuwork/\"\n",
    "gpum = [\"mygpuvm,4\"]\n",
    "print(\"Working in \" + workingDir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Train the distributed learner using the VM configured above\n",
    "learner = CNTKLearner(brainScript=brainscriptText, dataTransfer=\"hdfs\",\n",
    "                      gpuMachines=gpum, workingDir=workingDir)\n",
    "              .fit(train)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Evaluate the model\n",
    "scoredImages = learner.setOutputNodeName(\"z\") \\\n",
    "                      .setInputCol(\"images\").setOutputCol(\"scored\") \\\n",
    "                      .transform(test)\n",
    "scoredImages.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Transform the log probabilities to predictions\n",
    "def argmax(x): return max(enumerate(x),key=lambda p: p[1])[0]\n",
    "argmaxUDF = udf(argmax, IntegerType())\n",
    "imagePredictions = scoredImages.withColumn(\"predictions\", argmaxUDF(\"scored\"))\\\n",
    "                               .withColumn(\"labels\", argmaxUDF(\"labels\")) \\\n",
    "                               .select(\"predictions\", \"labels\")\n",
    "imagePredictions.registerTempTable(\"ImagePredictions\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql -q -o imagePredictions\n",
    "select * from ImagePredictions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%local\n",
    "y, y_hat = imagePredictions[\"labels\"], imagePredictions[\"predictions\"]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%matplotlib inline\n",
    "\n",
    "import matplotlib.pyplot as plt\n",
    "import numpy as np\n",
    "from sklearn.metrics import confusion_matrix\n",
    "\n",
    "cm = confusion_matrix(y, y_hat)\n",
    "\n",
    "labels = [\"airplane\", \"automobile\", \"bird\", \"cat\", \"deer\", \"dog\", \"frog\",\n",
    "          \"horse\", \"ship\", \"truck\"]\n",
    "plt.imshow(cm, interpolation=\"nearest\", cmap=plt.cm.Blues)\n",
    "plt.colorbar()\n",
    "tick_marks = np.arange(len(labels))\n",
    "plt.xticks(tick_marks, labels, rotation=90)\n",
    "plt.yticks(tick_marks, labels)\n",
    "plt.xlabel(\"Predicted label\")\n",
    "plt.ylabel(\"True Label\")\n",
    "plt.show()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
